home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2008 February / PCWFEB08.iso / Software / Freeware / Miro 1.0 / Miro_Installer.exe / Miro_Downloader.exe / test / framework.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2007-11-12  |  6.8 KB  |  187 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.5)
  3.  
  4. import unittest
  5. import database
  6. import eventloop
  7. import frontend
  8. import app
  9. import downloader
  10. import views
  11. import util
  12. import databaseupgrade
  13. import storedatabase
  14. import subscription
  15. import selection
  16. from time import sleep
  17. util.setupLogging()
  18.  
  19. class HadToStopEventLoop(Exception):
  20.     pass
  21.  
  22.  
  23. class DummyMainFrame:
  24.     
  25.     def __init__(self):
  26.         self.displays = { }
  27.         self.mainDisplay = 'mainDisplay'
  28.         self.channelsDisplay = 'channelsDisplay'
  29.         self.collectionDisplay = 'collectionDisplay'
  30.         self.videoInfoDisplay = 'videoInfoDisplay'
  31.  
  32.     
  33.     def selectDisplay(self, display, area):
  34.         self.displays[area] = display
  35.  
  36.     
  37.     def getDisplay(self, area):
  38.         return self.displays.get(area)
  39.  
  40.     
  41.     def onSelectedTabChange(self, tabType, multiple, guideURL, videoFilename):
  42.         pass
  43.  
  44.  
  45.  
  46. class DummyVideoDisplay:
  47.     
  48.     def fileDuration(self, filename, callback):
  49.         pass
  50.  
  51.     
  52.     def fillMovieData(self, filename, movie_data, callback):
  53.         pass
  54.  
  55.  
  56.  
  57. class DummyController:
  58.     
  59.     def __init__(self):
  60.         self.selection = selection.SelectionHandler()
  61.         self.frame = DummyMainFrame()
  62.         self.videoDisplay = DummyVideoDisplay()
  63.  
  64.  
  65.  
  66. class DemocracyTestCase(unittest.TestCase):
  67.     
  68.     def setUp(self):
  69.         database.set_thread()
  70.         views.initialize()
  71.         util.chatter = False
  72.         self.oldUtilDotFailed = util.failed
  73.         self.failedCalled = False
  74.         self.utilDotFailedOkay = False
  75.         
  76.         def newUtilDotFailed(*args, **kwargs):
  77.             if self.utilDotFailedOkay:
  78.                 self.failedCalled = True
  79.             else:
  80.                 print 'util.failed called!'
  81.                 print 'args: %s kwargs: %s' % (args, kwargs)
  82.                 import traceback
  83.                 if kwargs.get('withExn'):
  84.                     traceback.print_exc()
  85.                 else:
  86.                     traceback.print_stack()
  87.                 raise Exception('util.failed called')
  88.  
  89.         util.failed = newUtilDotFailed
  90.         app.controller = DummyController()
  91.  
  92.     
  93.     def tearDown(self):
  94.         util.chatter = True
  95.         database.resetDefaultDatabase()
  96.         eventloop._eventLoop = eventloop.EventLoop()
  97.  
  98.  
  99.  
  100. class EventLoopTest(DemocracyTestCase):
  101.     
  102.     def setUp(self):
  103.         DemocracyTestCase.setUp(self)
  104.         self.hadToStopEventLoop = False
  105.  
  106.     
  107.     def stopEventLoop(self, abnormal = True):
  108.         self.hadToStopEventLoop = abnormal
  109.         eventloop.quit()
  110.  
  111.     
  112.     def runPendingIdles(self):
  113.         idleQueue = eventloop._eventLoop.idleQueue
  114.         urgentQueue = eventloop._eventLoop.urgentQueue
  115.         while idleQueue.hasPendingIdle() or urgentQueue.hasPendingIdle():
  116.             urgentQueue.processIdles()
  117.             idleQueue.processNextIdle()
  118.  
  119.     
  120.     def runEventLoop(self, timeout = 10, timeoutNormal = False):
  121.         eventloop.threadPoolInit()
  122.         
  123.         try:
  124.             self.hadToStopEventLoop = False
  125.             timeout = eventloop.addTimeout(timeout, self.stopEventLoop, 'Stop test event loop')
  126.             eventloop._eventLoop.quitFlag = False
  127.             eventloop._eventLoop.loop()
  128.             if self.hadToStopEventLoop and not timeoutNormal:
  129.                 raise HadToStopEventLoop()
  130.             else:
  131.                 timeout.cancel()
  132.         finally:
  133.             eventloop.threadPoolQuit()
  134.  
  135.  
  136.     
  137.     def addTimeout(self, delay, function, name, args = None, kwargs = None):
  138.         eventloop.addTimeout(delay, function, name, args, kwargs)
  139.  
  140.     
  141.     def addWriteCallback(self, socket, callback):
  142.         eventloop.addWriteCallback(socket, callback)
  143.  
  144.     
  145.     def removeWriteCallback(self, socket):
  146.         eventloop.removeWriteCallback(socket)
  147.  
  148.     
  149.     def addIdle(self, function, name, args = None, kwargs = None):
  150.         eventloop.addIdle(function, name, args = None, kwargs = None)
  151.  
  152.     
  153.     def hasIdles(self):
  154.         if eventloop._eventLoop.idleQueue.queue.empty():
  155.             pass
  156.         return not eventloop._eventLoop.urgentQueue.queue.empty()
  157.  
  158.     
  159.     def processThreads(self):
  160.         eventloop._eventLoop.threadPool.initThreads()
  161.         while not eventloop._eventLoop.threadPool.queue.empty():
  162.             sleep(0.05)
  163.         eventloop._eventLoop.threadPool.closeThreads()
  164.  
  165.     
  166.     def processIdles(self):
  167.         eventloop._eventLoop.idleQueue.processIdles()
  168.         eventloop._eventLoop.urgentQueue.processIdles()
  169.  
  170.  
  171.  
  172. class DownloaderTestCase(EventLoopTest):
  173.     
  174.     def setUp(self):
  175.         EventLoopTest.setUp(self)
  176.         app.delegate = frontend.UIBackendDelegate()
  177.         downloader.startupDownloader()
  178.  
  179.     
  180.     def tearDown(self):
  181.         downloader.shutdownDownloader(eventloop.quit)
  182.         self.runEventLoop()
  183.         app.delegate = None
  184.         EventLoopTest.tearDown(self)
  185.  
  186.  
  187.